iT邦幫忙

第 12 屆 iThome 鐵人賽

DAY 24
0

Celery 是分散式的工作佇列系統,它可以用來作什麼呢?舉幾個例子來說,會比較容易理解:

  • 寄送大量的電子郵件
  • 網路爬蟲
  • 長時間運算
  • ...

也就是長時間、需要重試等等的工作,都適合使用工作佇列系統來處理。

工作佇列系統主體有幾個部分:

  1. worker:主要負責執行工作的程式
  2. broker:負責協調工作
  3. job:工作

運作時,會以訊息告知 broker 有工作要執行,訊息裡包含了工作所需的參數或內容,接著 broker 會把工作放到佇列裡。worker 會持續的監控佇列,在有工作時,就會拿出來處理。處理完成後,再把結果告知 broker 。

Celery 有以下優點:

  1. 分散式,可以有多個 broker、worker ,換言之,在 broker / worker 不足時,可以增加;太多的時候可以減少。
  2. 工作可以定期執行。
  3. 工作可以串接,也可以並行,組合成流程工作。
  4. 支援信號,可以在完成時通知傾聽訊號的函式。

那跟 Django 有什麼關係呢?網站處理 HTTP 請求,我們對 HTTP 請求的預期一般來說是越快越好,如果花太多時間,就會阻塞住網站的處理。所以,長時間的工作就會希望可以在背景執行,這個背景執行,就可以使用 Celery,把背景執行工作放到工作佇列裡,Celery worker 就會去執行。

專案網址:https://docs.celeryproject.org/en/stable/index.html

安裝

poetry add celery redis
sudo apt-get install rabbitmq-server

這裡也順便一起安裝 RabbitMQ 這個 Message Queue server 用來作為 Broker

設定

# settings.py
import environ
env = environ.Env()

# ...
REDIS_HOST = env('REDIS_HOST', default='localhost')
REDIS_PORT = env('REDIS_PORT', default=6379)
RABBITMQ_HOST = env('RABBITMQ_HOST', default='localhost')
CELERY_RESULT_BACKEND = 'redis://{redis_host}:{redis_port}/1'.format(
    redis_host=REDIS_HOST,
    redis_port=REDIS_PORT)
CELERY_BROKER_URL = 'amqp://guest@{rabbitmq_host}//'.format(
    rabbitmq_host=RABBITMQ_HOST)
CELERY_ENABLE_UTC = True
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'

# When using librabbitmq, must use PROTOCOL 1
# https://stackoverflow.com/questions/42081061/celery-rabbitmqwarning-mainprocess-received-and-deleted-unknown-message-wron/42561772
CELERY_TASK_PROTOCOL = 1

設定這邊,我們讀取環境變數裡的 REDIS_HOST, REDIS_PORT, RABBITMQ_HOST ,然後用 Redis 作為 Celery 的 Result backend ,Result backend 是用來存放結果的地方;用 RabbitMQ 作為 Celery 的 broker backend,這指的是訊息佇列的位置。

除了 result 與 broker 之外,我們也指定了 Celery 要用的時區、內容格式。最後一個 CELERY_TASK_PROTOCOL 則是自己所遇到的經驗,如果是使用 librabbitmq 的話,得設定為 1 ,否則 Celery 跟 RabbitMQ 溝通時會有警告訊息。

設定好以後,接下來就是怎麼使用了。

使用

第一步,得先準備 Celery app,這是在執行 celery 指令,會去呼叫的應用程式起點。

# django_ithome_ironman/celery.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'django_ithome_ironman.settings')

app = Celery('proj')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

注意最後一行的 app.autodiscover_tasks() ,這會自動去尋找程式裡有加上 task decorator 的函式。

第二步,修改 django_ithome_ironman/init.py ,import 我們剛剛加的 celery app

# django_ithome_ironman/__init__.py
from __future__ import absolute_import
from .celery import app as celery_app

__all__ = ['celery_app']

第三步,準備要執行的 Task,這裡我們定義一個簡單的 add,加上 shared_task decorator。

# news/tasks.py
from celery import shared_task

@shared_task
def add(x, y):
    return x + y

第四步,開啟一個終端機視窗,這裡要先執行 celery worker

poetry run celery -A django_ithome_ironman worker -l info

最後,來呼叫我們寫的 add Task,再開啟一個終端機視窗,進入 shell:poetry run python manage.py shell

>>> from django_ithome_ironman.tasks import add
>>> x = add.apply_async(args=(3, 4,))
>>> print(f"x.result={x.result}")

這時就會在前面那個終端機視窗看到 add 函式被呼叫執行了。

這邊的 x 是一個 AsyncResult 類別的實體,必須要使用 x.result 才能取得函式執行的結果。

這次再試試看 countdown 這個參數,這個參數是指在指定秒數後才執行。

>>> z = add.apply_async(args=(4, 5, ), countdown=15)
>>> print(f"z.result={z.result}")

這時會發現 z.result 是空的,但經過 15秒以後再印一次,就有結果了。

結語

關於 Celery 我們就介紹到這邊,但 Celery 不只有這些,還有許多功能值得挖掘,例如:

  • Canvas :這可以用來規劃 Task 流程,讓 Task 可以順序執行,也可以並行處理。
  • Period Task:定期執行的 Task,類似 cron 的功用。
  • ...

這裡因為篇幅關係,就此打住,上述範例的程式碼在 https://github.com/elleryq/ithome-iron-2020-django/tree/day-24

參考資料


上一篇
23. whitenoise
下一篇
25. django-q
系列文
加速你的 Django 網站開發 - Django 的好用套件30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言